Published on

flutter 并发执行任务

Authors

isolate 与 future

它们间的区别是,isolate 创建了隔离区,是在一个新的线程上执行。

而future只是把任务放入EventLoop(事件循环)的末尾,它与其他任务只有时序上的区别,并不是在新的线程上执行。

isolate and future

Future的使用

Future使用非常简单

var job1 = Future((){
    //....do some heavy jobs
});

var job2 = Future((){
    //....do some heavy jobs
});

// 等待执行完毕,并获取返回值
var results = Future.wait([job1, job2])

Isolate的使用

isolate 生成一个与main隔离的dartVM执行区域,每个isolate直接通过消息来传递数据。创建的isolate并不能获得主isolate的内存。

Isolate.run()

最简单的方式,通过Isolate.run()创建一个闭包函数启动并行任务,并通过Future.wait(),等待执行完毕,获取返回值。

final jsonDataFuture = Isolate.run(()async{
    final fileData = await File(filename).readAsString();
    final jsonData = jsonDecode(fileData) as Map<String, dynamic>;
    return jsonData;
})

final printFuture = Isolate.run(()async{
    print("isolate...")
    return ;
})

var results = await Future.wait([jsonDataFuture, printFuture]);
for (var result in results){
    print(result);
}

Isolate.spawn()

Ioslate.spawn()在同一个dart文件代码中创建一个隔离区,并可以传递一个参数。

void SayHello(List<String> s){
    // It will print "hi", "hello"
    print(s)
}

void main(){
    Isolate.spawn(SayHello, ["hi", "hello"])
}

在两个isolate之间通信,需要创建一个类似go channel的“管道”来通信,通过ReceivePort来创建管道。

list和send不能包含在死循环中,而且消息发送是半双工的,只能一方发,另一方收,不能同时发和收。

void main(){
  var receivePort = ReceivePort();
  
  var musicIsolate = await Isolate.spawn(playMusic, receivePort.sendPort);
  receivePort.listen((msg){
    if(msg is SendPort){
      msg.send("i'm main isolate...");
    }else if (msg is String){
      switch (msg){
        case "kill":
          receivePort.close();
          musicIsolate.kill();
        default:
          print(msg);
      }
    }
  });
}

void playMusic(SendPort sendPort){

  // 创建一个管道,让主隔离区也可以发信息过来
  final receivePort = ReceivePort();
  sendPort.send(receivePort.sendPort);
  receivePort.listen((msg){
    print("Main Isolate:$msg");
  });
  
  while(true){
    print("play music.........");
    sleep(Duration(seconds: 2));
  }
}

receivePort需要关闭,否则会阻塞主isolate。

Isolate.spawnUri()

Isolate.spawnUri 通过Uri获取的dart源码启动一个隔离区,必须包含main函数,并可以向main函数传递参数和信息。

这里通过一个本地的dart文件来测试这个功能。

main.dart

void main(){

  var childIsolateReceivePort = ReceivePort();
  var childIsolate = await Isolate.spawnUri(
    Uri(path: "child_isolate.dart"), 
    ["-v", "22.0"], 
    childIsolateReceivePort.sendPort);

  childIsolateReceivePort.listen((msg) {
      if(msg case String _){
        switch(msg){
          case "finished":
            childIsolateReceivePort.close();
            childIsolate.kill();
        }
      print(msg);
    }
  });
}

child_isolate.dart

void main(List<String> args,  SendPort mainSendPort){
  print("Get args:$args ");
  mainSendPort.send("start");
  sleep(Duration(seconds: 1));
  mainSendPort.send("processing");
  sleep(Duration(seconds: 1));
  mainSendPort.send("finished");
}

Stream流并行获取

这个接口是flutter对dart:async的扩充,只能在flutter里使用。

Stream<Map<String, dynamic>> countNumbers(String name, int n) async* {
  for (int i = n; i < n+5; i++) {
    yield {"name": name, "num": i};
  }
}

void main()  async {

await for (var result in StreamGroup.merge([countNumbers("task1",0), countNumbers("task2",5)])){
  debugPrint("$result");
}
}

可以看到task1与task2交替输出

Compute计算函数

flutter专有的函数

Future<bool> isPrime(int value){
  return compute(_calculate, value);
} 

bool _calculate(int value){
  if(value == 1){
    return false;
  }
  for(int i=2; i < value; i++){
    if(value%i == 0){
      return false;
    }
  }
  return true;
}

void main()  async {

  var res = await isPrime(127);
  debugPrint("isPrime:$res");
}

跟Isolate.spawn()用法类似。